Snowflakeで共有されたテーブルの変更を追跡して別テーブルを自動更新する
DA事業本部のnkhrです。Snowflakeではアカウント間でデータ共有ができます。本ブログでは、共有されたテーブルに対して変更追跡を行うためのストリームオブジェクトを設定し、変更履歴をもとに別テーブルを自動で更新する処理を検証しました。
Snowflakeでデータ共有する方法は3種類(Direct Share、Marketplace、Data Exchange)あります。本ブログで検証するデータ共有はDirect Shareです。
データ共有では、提供元をProviderアカウント、提供先をConsumerアカウントと呼びます。共有されたデータベースの実体はProviderアカウントにあります。Consumer側では、共有されたデータベース内にオブジェクトを作成したり、共有オブジェクトを修正することはできません。
今回の検証では、以下の図のイメージを実装します。
Providerアカウントの共有設定
共有したテーブルの変更履歴(STREAM)をConsumer側で追跡するためには共有するテーブルの「CHANGE_TRACKING」を「TRUE」にする必要があります。
共有設定の詳細は、以下のブログを参照してください。
Provider側では以下のオブジェクトを共有します。
- Database: test_db
- Schema: test
- Table: share
- Share object: test_s
実行コードは以下の通りです。
USE ROLE accountadmin; GRANT CREATE SHARE ON ACCOUNT TO sysadmin; USE ROLE SYSADMIN; CREATE DATABASE test_db; CREATE SCHEMA test; CREATE table test_db.test.share(id INTEGER, name VARCHAR(20), class_number INTEGER); INSERT INTO test.share VALUES (1, 'Juddy', 3), (2, 'Bob', 3), (3, 'Sam', 5); ALTER TABLE test.SHARE SET CHANGE_TRACKING = TRUE; CREATE OR REPLACE SHARE test_share; GRANT USAGE ON DATABASE test_db TO SHARE test_share; GRANT USAGE ON SCHEMA test_db.test TO SHARE test_share; GRANT SELECT ON TABLE test_db.test.share TO SHARE test_share; ALTER SHARE test_share SET ACCOUNTS = <sonsumer_accounts>;
Consumer側アカウントの設定
Consumer側では以下の作業を行います。
- Providerアカウントから共有されたShare Objectからデータベースを作成
- 共有テーブルに対してSTREAMを作成
- STREAMの変更をもとにConsumer側のテーブルを更新するTASKを作成
ストリームとタスクを利用したデータパイプラインの詳細については、下記のブログが参考になります。
ProviderアカウントのShare Objectからデータベース作成
Consumer側で共有データベースを参照するためには「IMPORTED PRIVILEGES」権限が必要です。共有データベースの利用権限はProvider側でShare Objectに設定されています。
USE ROLE ACCOUNTADMIN; CREATE DATABASE shared_db FROM SHARE <provider account>.test_share; GRANT IMPORTED PRIVILEGES ON DATABASE shared_db TO ROLE sysadmin;
共有データベースのテーブルに対してSTRAMを作成
共有されたデータベース内に、Consumer側でオブジェクトを作成できないため、STREAMはConsumer所有のデータベースに作成します。
Consumer側では、ユーザ一覧とユーザの変更日付を保持するuserlistテーブルを作成します。
USE ROLE SYSADMIN; CREATE DATABASE c_db; CREATE SCHEMA c_db.s_test; CREATE TABLE c_db.s_test.userlist(id, name, updated_at) AS SELECT id, name current_timestamp(0) FROM shared_db.test.share; CREATE STREAM c_db.s_test.mystream ON TABLE shared_db.test.share;
変更履歴からテーブルを更新するタスク作成
2021/12時点では、タスクの実行方法はスケジュール実行のみが提供されています。タスクの実行中に次のスケジュールの実行時間となった場合は、次の実行はスキップされます。TASK作成のWHEN句に「SYSTEM$STREAM_HAS_DATA(<stream name>)」
を指定することで、対象のストリーム内にレコードが存在しない場合はタスク実行をスキップしてくれます。
スケジュールの指定は「X分ごとの実行」または「Cron式による実行」が指定できます。CREATE TASKの指定パラメータについてはリンク先をご参照ください。
2021/12時点では、アカウント内でStarted状態(実行中)のタスクは上限10000です(Suspend状態の場合はこのカウントに含まれません)
CREATE OR REPLACE TASK c_db.s_test.update_userlist WAREHOUSE = TEST_WH SCHEDULE = '1 minute' ALLOW_OVERLAPPING_EXECUTION = FALSE USER_TASK_TIMEOUT_MS = 600000 WHEN SYSTEM$STREAM_HAS_DATA('c_db.s_test.mystream') AS MERGE INTO c_db.s_test.userlist u // ストリームに同じid, nameのレコードが2レコード存在するのは // class_numberカラムのみをUpdateした場合のため利用しないレコードとみなす USING ( SELECT id, name, METADATA$ACTION AS operation_type FROM c_db.s_test.mystream s1 WHERE NOT EXISTS ( SELECT 1 FROM c_db.s_test.mystream s2 WHERE s1.id = s2.id AND s1.name = s2.name GROUP BY id, name HAVING COUNT(*) > 1 )) s ON u.id = s.id AND u.name = s.name WHEN MATCHED AND s.operation_type = 'DELETE' THEN DELETE WHEN NOT MATCHED AND s.operation_type = 'INSERT' THEN INSERT (id, name, updated_at) VALUES (s.id, s.name, current_timestamp(0)) ; ALTER TASK IF EXISTS c_db.s_test.update_userlist RESUME;
動作確認
- Provider側での実行(2分ごとに各行を実行)
// ①追加 INSERT INTO test.share VALUES (4, 'Ken', 3), (5, 'Tonny', 5); // ②削除 DELETE FROM test.share WHERE id = 1; // ③更新 UPDATE test.share SET id = 1 WHERE id = 5; // ④classnumberカラムの更新 UPDATE test.share SET class_number = 10 WHERE id = 2;
- consumer側での実行(Provider側で実行後に1分経過以降)
SELECT * FROM c_db.s_test.userlist;
①の結果(既存3行+追加2行=5行)
②の結果(1行削除=>4行、id = 2, 3, 4, 5)
③の結果(更新⇒4行、id = 1, 2, 3, 4)
④の結果(変化なし、id = 1, 2, 3, 4)
まとめ
共有されたテーブルの変更履歴から自動で、Consumer側のテーブルを更新する方法を検証しました。SnowflakeではOrganizationやReader Account機能により、複数アカウントの作成が簡単にできるため、アカウント間でのデータ共有をうまく利用することで、データの信頼性や処理効率化が図れるようになったら良いですね。
以上、@nkhrでした。